add S3 Enrich processor to merge ml batch job output with source inputs#5992
Conversation
05e390c to
a6cddc2
Compare
dlvenable
left a comment
There was a problem hiding this comment.
Thank you @Zhangxunmt for this change!
I left some comments. One thing that might help reduce the amount of work in this PR is to create a different PR that moves all the common code from S3 source into s3-common as its own PR.
a7e549a to
32673a9
Compare
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @Zhangxunmt , I have a few more comments.
| */ | ||
| public Cache<String, Event> getOrLoadRecordCache(String s3Url, Supplier<Cache<String, Event>> loader) { | ||
| return s3Cache.get(s3Url, key -> { | ||
| LOG.info("Loading S3 data for: {}", key); |
There was a problem hiding this comment.
This should probably be a debug log. It will happen too often for normal logging.
| if (matcher.matches() && matcher.groupCount() >= 1) { | ||
| final String baseName = matcher.group(1); | ||
| if (baseName != null && !baseName.isBlank()) { | ||
| return baseName + ".jsonl"; |
There was a problem hiding this comment.
Should this jsonl extension be configurable?
There was a problem hiding this comment.
let's reuse this config in the extension to make it parametric. This is used in the S3 sink to define the extension of the generated file.
codec:
ndjson:
extension: jsonl
| * @param targetEvent the event to enrich (output record) | ||
| * @param sourceEvent the event containing source data (from cache) | ||
| */ | ||
| private void mergeData(Event targetEvent, Event sourceEvent) { |
There was a problem hiding this comment.
The Event class has a merge method. Can it work for you? If not, is there a way it could be configured for your needs?
| @ExtendWith(MockitoExtension.class) | ||
| class S3ObjectReferenceResolverTest { | ||
|
|
||
| private static final String BUCKET_NAME = "test-bucket"; |
There was a problem hiding this comment.
Make this and KEY_PATH into a non-static fields. Initialize them as random values in @BeforeEach.
|
|
||
| @Test | ||
| void resolve_throws_when_s3_key_is_blank() { | ||
| when(event.get(KEY_PATH, String.class)).thenReturn(" "); |
There was a problem hiding this comment.
I think you also need this:
when(bucketOption.getName()).thenReturn(bucketName);
Otherwise you may not testing what you mean to.
|
|
||
| @Test | ||
| void resolve_throws_when_s3_key_is_null() { | ||
| when(event.get(KEY_PATH, String.class)).thenReturn(null); |
There was a problem hiding this comment.
I think you also need this:
when(bucketOption.getName()).thenReturn(bucketName);
Otherwise you may not testing what you mean to.
| private final Counter numberOfRecordsSuccessCounter; | ||
| private final Counter numberOfRecordsFailedCounter; | ||
|
|
||
| @DataPrepperPluginConstructor |
There was a problem hiding this comment.
Maybe we should also make this @Experimental while we work on it in case we want to make some changes.
There was a problem hiding this comment.
Yes agree. I will add the Experimental annotation to it.
| * @throws UnsupportedOperationException if the current Event does not support merging. | ||
| * @since 2.11 | ||
| */ | ||
| void merge(Event other, List<String> keys); |
| private final List<ScanObjectWorker> workers; | ||
|
|
||
| public S3ScanService(final S3SourceConfig s3SourceConfig, | ||
| public S3ScanService(final S3SourceConfig s3SourceConfig, |
| * @param keys the list of keys to selectively merge | ||
| * @throws IllegalArgumentException if the input event is not compatible to merge. | ||
| * @throws UnsupportedOperationException if the current Event does not support merging. | ||
| * @since 2.11 |
There was a problem hiding this comment.
| * @since 2.11 | |
| * @since 2.15 |
The next version is 2.15.
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
| if (matcher.matches() && matcher.groupCount() >= 1) { | ||
| final String baseName = matcher.group(1); | ||
| if (baseName != null && !baseName.isBlank()) { | ||
| return baseName + ".jsonl"; |
There was a problem hiding this comment.
let's reuse this config in the extension to make it parametric. This is used in the S3 sink to define the extension of the generated file.
codec:
ndjson:
extension: jsonl
| private final Counter numberOfRecordsSuccessCounter; | ||
| private final Counter numberOfRecordsFailedCounter; | ||
|
|
||
| @DataPrepperPluginConstructor |
There was a problem hiding this comment.
Yes agree. I will add the Experimental annotation to it.
|
|
||
| import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; | ||
|
|
||
| @Experimental |
There was a problem hiding this comment.
added @experimental here for the new processor class.
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @Zhangxunmt for this great new processor!
| for (final String key : keys) { | ||
| final Object value = other.get(key, Object.class); | ||
| if (value != null) { | ||
| put(key, value); |
There was a problem hiding this comment.
If the data has something like this { "key": null }, do you thing users will want it merged?
There was a problem hiding this comment.
Good question. For the S3 enrich use case(adding data from a lookup), skipping nulls makes sense — you don't want a missing/null enrichment field to overwrite a valid value in the target event. But in general, a user might explicitly want to merge a null to clear a field.
Given that the only current consumer is S3EnrichProcessor and the semantics are enrichment, I think we can keep skipping nulls. But I'll add a comment in the code in the next PR to make the decision explicit so in the future this can enhanced for other cases.
627a6b7
into
opensearch-project:main
Description
This PR implements the proposed S3 Enricher processor to merge the data from S3 into the events in the data prepper pipeline.
This processor can be used to merge the ML batch job results in the S3Key "<source_file_basename-xxxxx>.jsonl.out" with certain fields from source data in the S3Key "source_file_basename.jsonl".
This is the config of this processor: